18.3 任务

我们已经知道编译器会将“go func(…)”语句翻译成newproc调用,但这中间究竟有什么不为人知的秘密?

test.go

package main

import ()

func add(x, y int) int { z := x + y return z }

func main() { x := 0x100 y := 0x200 go add(x, y) }

尽管这个示例有些简陋,但这不重要,重要的是编译器要做什么。

$ go build -o test test.go

$ go tool objdump -s “main.main” test

TEXT main.main(SB) test.go test.go:10 SUBQ 0x100, CX test.go:12 MOVQ 0x18, 0(SP) // 参数长度入栈 test.go:13 LEAQ 0x879ff(IP), AX // 将函数 add 地址存入 AX 寄存器 test.go:13 MOVQ AX, 0x8(SP) // 地址入栈 test.go:13 CALL runtime.newproc(SB) test.go:14 ADDQ $0x28, SP test.go:14 RET

从反汇编代码可以看出,Go采用了类似C/cdecl的调用约定。由调用方负责提供参数空间,并从右往左入栈。

proc1.go

func newproc(siz int32, fn *funcval) { // 获取第一参数地址 argp := add(unsafe.Pointer(&fn), ptrSize)

// 获取调用方 PC/IP 寄存器值 pc := getcallerpc(unsafe.Pointer(&siz))

// 用 g0 栈创建 G/goroutine 对象 systemstack(func() { newproc1(fn, (*uint8)(argp), siz, 0, pc) }) }

目标函数newproc只有两个参数,但main却向栈压入了四个值。按照顺序,后三个值应该会被合并成funcval。还有,add返回值会被忽略。

runtime2.go

type funcval struct { fn uintptr // variable-size, fn-specific data here }

果然是变长结构类型(目标函数参数不定),此处其补全状态应该是:

type struct { fn uintptr x int y int }

如此一来,关于“go语句会复制参数值”的规则就很好理解了。站在newproc角度,我们可以画出执行栈的状态示意图。

lower SP +------------+ | | ^ +------------+ — newproc frame . | pc/ip | . +------------+ . | siz | address +------------+ ---+ . | add | | . +------------+ | . | x | > fn . +------------+ | | y | | higher +------------+ ---+

用“fn+ptrsize”跳过add获得第一个参数x的地址,getcallerpc用“siz-8”读取CALL指令压入的main PC/IP寄存器值,这就是newproc为newproc1准备的相关参数值。

asm_amd64.s

TEXT runtime·getcallerpc(SB),NOSPLIT,$8-16 MOVQ argp+0(FP),AX // addr of first arg MOVQ -8(AX),AX // get calling pc CMPQ AX, runtime·stackBarrierPC(SB) JNE nobar CALL runtime·nextBarrierPC(SB) // Get original return PC. MOVQ 0(SP), AX nobar: MOVQ AX, ret+8(FP) RET

至此,我们大概知道go语句编译后的真实模样。接下来,就转到newproc1看看如何创建并发任务单元G。

runtime2.go

type g struct { stack stack // 执行栈 sched gobuf // 用于保存执行现场 goid int64 // 唯一序号 gopc uintptr // 调用者 PC/IP startpc uintptr // 任务函数 }

proc1.go

func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g { g := getg()

//“参数 + 返回值”所需空间(对齐) siz := narg + nret siz = (siz + 7) &^ 7

// 从当前 P 复用链表获取空闲的 G 对象 p := g.m.p.ptr() newg := gfget(p)

// 获取失败,新建 if newg == nil { newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) allgadd(newg) }

// 测试 G stack if newg.stack.hi == 0 { throw(“newproc1: newg missing stack”) }

// 测试 G status if readgstatus(newg) != _Gdead { throw(“newproc1: new g is not Gdead”) }

// 计算所需空间大小,并对齐 totalSize := 4*regSize + uintptr(siz) totalSize += -totalSize & (spAlign - 1)

// 确定 SP 和参数入栈位置 sp := newg.stack.hi - totalSize spArg := sp

// 将执行参数拷贝入栈 memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))

// 初始化用于保存执行现场的区域 memclr(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.sched.pc = funcPC(goexit) + _PCQuantum newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn)

// 初始化基本状态 newg.gopc = callerpc newg.startpc = fn.fn casgstatus(newg, _Gdead, _Grunnable)

// 设置唯一 id if p.goidcache == p.goidcacheend { // sched.goidgen 是一个全局计数器 // 每次取回一段有效区间,然后在该区间分配,避免频繁地去全局操作 // [sched.goidgen+1, sched.goidgen+GoidCacheBatch] p.goidcache = xadd64(&sched.goidgen, _GoidCacheBatch) p.goidcache -= _GoidCacheBatch - 1 p.goidcacheend = p.goidcache + _GoidCacheBatch } newg.goid = int64(p.goidcache) p.goidcache++

// 将 G 放入待运行队列 runqput(p, newg, true)

// 如果有其他空闲 P,则尝试唤醒某个 M 出来执行任务 // 如果有 M 处于自旋等待 P 或 G 状态,放弃 // 如果当前创建的是 main goroutine (runtime.main),那么还没有其他任务需要执行,放弃 if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 && unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) { wakep() }

return newg }

整个创建过程中,有一系列问题需要分开细说。

首先,G对象默认会复用,这看上去有点像cache/object做法。除P本地的复用链表外,还有全局链表在多个P之间共享。

runtime2.go

type p struct { gfree *g gfreecnt int32 }

type schedt struct { gfree *g ngfree int32 }

proc1.go

func gfget(p *p) *g { retry: // 从 P 本地队列提取复用对象 gp := p.gfree

// 如果提取失败,尝试从全局链表转移一批到 P 本地 if gp == nil && sched.gfree != nil { // 最多转移 32 个 for p.gfreecnt < 32 && sched.gfree != nil { p.gfreecnt++ gp = sched.gfree sched.gfree = gp.schedlink.ptr() sched.ngfree— gp.schedlink.set(p.gfree) p.gfree = gp }

 // 再试 
 goto retry 

}

// 如果成功获取复用对象 if gp != nil { // 调整 P 复用链表 p.gfree = gp.schedlink.ptr() p.gfreecnt—

 // 检查 G stack 
 if gp.stack.lo == 0 { 
     // 分配新栈 
     systemstack(func() { 
         gp.stack, gp.stkbar = stackalloc(_FixedStack) 
     }) 
     gp.stackguard0 = gp.stack.lo + _StackGuard 
     gp.stackAlloc = _FixedStack 
 } else { 
 } 

}

return gp }

而当goroutine执行完毕,调度器相关函数会将G对象放回P复用链表。

proc1.go

func gfput(p *p, gp *g) { // 如果栈发生过扩张,则释放 stksize := gp.stackAlloc if stksize != _FixedStack { // non-standard stack size - free it. stackfree(gp.stack, gp.stackAlloc) gp.stack.lo = 0 gp.stack.hi = 0 gp.stackguard0 = 0 gp.stkbar = nil gp.stkbarPos = 0 } else { // Reset stack barriers. gp.stkbar = gp.stkbar[:0] gp.stkbarPos = 0 }

// 放回 P 本地复用链表 gp.schedlink.set(p.gfree) p.gfree = gp p.gfreecnt++

// 如果本地复用对象过多,则转移一批到全局链表 if p.gfreecnt >= 64 { // 本地仅保留 32 个 for p.gfreecnt >= 32 { p.gfreecnt— gp = p.gfree p.gfree = gp.schedlink.ptr() gp.schedlink.set(sched.gfree) sched.gfree = gp sched.ngfree++ } } }

最初,G对象都是由malg创建的。

stack2.go

_StackMin = 2048

proc1.go

func malg(stacksize int32) *g { newg := new(g) if stacksize >= 0 { stacksize = round2(_StackSystem + stacksize) systemstack(func() { newg.stack, newg.stkbar = stackalloc(uint32(stacksize)) }) newg.stackguard0 = newg.stack.lo + _StackGuard newg.stackguard1 = ^uintptr(0) newg.stackAlloc = uintptr(stacksize) } return newg }

默认采用2KB栈空间,并且都被allg引用。这是垃圾回收遍历扫描的需要,以便获取指针引用,收缩栈空间。

proc1.go

var ( allg **g allglen uintptr allgs []*g )

func allgadd(gp *g) { allgs = append(allgs, gp) allg = &allgs[0] allglen = uintptr(len(allgs)) }

现在我们知道G的由来,以及复用方式。只是有个小问题,G似乎从来不被释放,会不会有存留过多的问题?不过好在垃圾回收会调用shrinkstack将其栈空间回收。有关栈的相关细节,留待后文再说。

在获取G对象后,newproc1会进行一系列初始化操作,毕竟不管新建还是复用,这些参数都必须正确地设置。同时,相关执行参数会被拷贝到G的栈空间,因为它和当前任务不再有任何关系,各自使用独立的栈空间。毕竟,“go func(…)”语句仅创建并发任务,当前流程会继续自己的逻辑。

创建完毕的G任务被优先放入P本地队列等待执行,这属于无锁操作。

proc1.go

func runqput(p *p, gp *g, next bool) { if randomizeScheduler && next && fastrand1()%2 == 0 { next = false }

// 如果可能,将 G 直接保存在 P.runnext,作为下一个优先执行任务 if next { retryNext: oldnext := p.runnext if !p.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return }

 // 原本的 next G 会被放回本地队列 
 gp = oldnext.ptr() 

}

retry: // runqhead 是一个数组实现的循环队列 // head、tail 累加,通过取模即可获得索引位置,很典型的算法 h := atomicload(&p.runqhead) t := p.runqtail

// 如果本地队列未满,直接放到尾部 if t-h < uint32(len(p.runq)) { p.runq[t%uint32(len(p.runq))] = gp atomicstore(&p.runqtail, t+1) return }

// 放入全局队列 // 因为需要加锁,所以 slow if runqputslow(p, gp, h, t) { return }

goto retry }

任务队列分为三级,按优先级从高到低分别是P.runnext、P.runq、Sched.runq,很有些CPU多级缓存的意思。

runtime2.go

type schedt struct { runqhead guintptr runqtail guintptr runqsize int32 }

type p struct { runqhead uint32 runqtail uint32 runq [256]*g // 本地队列,访问时无须加锁 runnext guintptr // 优先执行 }

type g struct { schedlink guintptr // 链表 }

往全局队列添加任务,显然需要加锁,只是专门取名为runqputslow就很有说法了。去看看到底怎么个慢法。

proc1.go

func runqputslow(p *p, gp *g, h, t uint32) bool { // 这意思显然是要从 P 本地转移一半任务到全局队列 // “+1” 是别忘了当前这个 gp var batch [len(p.runq)/2 + 1]*g

// 计算一半的实际数量 n := t - h n = n / 2

// 从队列头部提取 for i := uint32(0); i < n; i++ { batch[i] = p.runq[(h+i)%uint32(len(p.runq))] }

// 调整 P 队列头部位置 if !cas(&p.runqhead, h, h+n) { return false }

// 加上当前 gp 这家伙 batch[n] = gp

// 对顺序进行洗牌 if randomizeScheduler { for i := uint32(1); i n; i++ { j := fastrand1() % (i + 1) batch[i], batch[j] = batch[j], batch[i] } }

// 串成链表 for i := uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) }

// 添加到全局队列尾部 globrunqputbatch(batch[0], batch[n], int32(n+1)) return true }

func globrunqputbatch(ghead *g, gtail *g, n int32) { gtail.schedlink = 0 if sched.runqtail != 0 { sched.runqtail.ptr().schedlink.set(ghead) } else { sched.runqhead.set(ghead) } sched.runqtail.set(gtail) sched.runqsize += n }

若本地队列已满,一次性转移半数到全局队列。这个好理解,因为其他P可能正饿着呢。这也正好解释了newproc1最后尝试用wakep唤醒其他M/P去执行任务的意图,毕竟充分发挥多核优势才是正途。

最后标记一下G的状态切换过程。

— gfree -----+ | IDLE DEAD RUNNABLE - RUNNING - DEAD --- … gfree 新建 初始化前 初始化后 调度执行 执行完毕